package org.apache.seatunnel.connectors.seatunnel.gaussdbredis.source;

import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;

import org.apache.seatunnel.connectors.seatunnel.gaussdbredis.config.GaussDBRedisConfig;
import org.apache.seatunnel.connectors.seatunnel.gaussdbredis.config.GaussDBRedisParameters;
import org.apache.seatunnel.connectors.seatunnel.gaussdbredis.exception.GaussDBRedisConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

@AutoService(SeaTunnelSource.class)
public class GaussDBRedisSource extends AbstractSingleSplitSource<SeaTunnelRow> {
    private final GaussDBRedisParameters redisParameters = new GaussDBRedisParameters();
    private SeaTunnelRowType seaTunnelRowType;
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;

    @Override
    public String getPluginName() {
        return "GaussDB-Redis";
    }

    @Override
    public void prepare(Config pluginConfig) throws PrepareFailException {
        CheckResult result =
                CheckConfigUtil.checkAllExists(
                        pluginConfig,
                        GaussDBRedisConfig.HOST.key(),
                        GaussDBRedisConfig.PORT.key(),
                        GaussDBRedisConfig.KEY_PATTERN.key(),
                        GaussDBRedisConfig.DATA_TYPE.key());
        if (!result.isSuccess()) {
            throw new GaussDBRedisConnectorException(
                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                    String.format(
                            "PluginName: %s, PluginType: %s, Message: %s",
                            getPluginName(), PluginType.SOURCE, result.getMsg()));
        }
        this.redisParameters.buildWithConfig(pluginConfig);
        if (pluginConfig.hasPath(GaussDBRedisConfig.FORMAT.key())) {
            if (!pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
                throw new GaussDBRedisConnectorException(
                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                        String.format(
                                "PluginName: %s, PluginType: %s, Message: %s",
                                getPluginName(),
                                PluginType.SOURCE,
                                "Must config schema when format parameter been config"));
            }

            GaussDBRedisConfig.Format format =
                    GaussDBRedisConfig.Format.valueOf(
                            pluginConfig.getString(GaussDBRedisConfig.FORMAT.key()).toUpperCase());
            if (GaussDBRedisConfig.Format.JSON.equals(format)) {
                this.seaTunnelRowType =
                        CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
                this.deserializationSchema =
                        new JsonDeserializationSchema(false, false, seaTunnelRowType);
            }
        } else {
            this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
            this.deserializationSchema = null;
        }
    }

    @Override
    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    @Override
    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return seaTunnelRowType;
    }

    @Override
    public AbstractSingleSplitReader<SeaTunnelRow> createReader(
            SingleSplitReaderContext readerContext) throws Exception {
        return new GaussDBRedisSourceReader(redisParameters, readerContext, deserializationSchema);
    }
}
